{
 "cells": [
  {
   "cell_type": "markdown",
   "source": [
    "Distributed Learning with Vineyard\n",
    "==================================\n",
    "\n",
    "With the growth of data, distributed learning is becoming a must in real-world machine learning\n",
    "applications, as the data size can easily exceed the memory limit of a single machine.\n",
    "Thus, many distributed systems addressing different workloads are developed\n",
    "and they share the same objective of extending users' single machine prototypes \n",
    "to distributed settings with as few modifications to the code as possible.\n",
    "\n",
    "For example, **dask.dataframe** mimics the API of **pandas** which is the de-facto standard\n",
    "library for single-machine structured data processing, so that users can apply their\n",
    "pandas code for data preprocessing in the dask cluster with few modifications.\n",
    "Similarly, **horovod** provides easy-to-use APIs for users to transfer their single-machine\n",
    "code in machine learning frameworks (e.g., TensorFlow, PyTorch, MXNet) to the distributed settings\n",
    "with only a few additional lines of code.\n",
    "\n",
    "However, when extending to distributed learning, the data sharing between libraries within the same\n",
    "python process (e.g., pandas and tensorflow) becomes inter-process sharing between engines (e.g.,\n",
    "dask and horovod), not to mention in the distributed fashion. Existing solutions using external\n",
    "distributed file systems are less than optimal for the huge I/O overheads.\n",
    "\n",
    "Vineyard shares the same design principle with the aforementioned distributed systems, which aims to\n",
    "provide efficient cross-engine data sharing with few modifications to the existing code.\n",
    "Next, we demonstrate how to transfer a single-machine learning example in **keras** to\n",
    "distributed learning with dask, horovod and Vineyard.\n",
    "\n",
    "An Example from Keras\n",
    "---------------------\n",
    "\n",
    "This [example](https://keras.io/examples/structured_data/wide_deep_cross_networks/)\n",
    "uses the Covertype dataset from the UCI Machine Learning Repository.\n",
    "The task is to predict forest cover type from cartographic variables.\n",
    "The dataset includes 506,011 instances with 12 input features:\n",
    "10 numerical features and 2 categorical features.\n",
    "Each instance is categorized into 1 of 7 classes.\n",
    "\n",
    "The solution contains three steps:\n",
    "\n",
    "1. preprocess the data in pandas to extract the 12 features and the label\n",
    "2. store the preprocessed data in files\n",
    "3. define and train the model in keras\n",
    "\n",
    "\n",
    "Mapping the solution to distributed learning, we have:\n",
    "\n",
    "1. preprocess the data in dask.dataframe\n",
    "2. share the preprocessed data using Vineyard\n",
    "3. train the model in horovod.keras\n",
    "\n",
    "\n",
    "We will walk through the code as follows.\n",
    "\n",
    "Setup\n",
    "-------\n",
    "\n",
    "The distributed deployment of vineyard and dask is as follows: on each machine, we launch a vineyard daemon process to handle the local data storage on that machine; and we also launch a dask worker on that machine for the computation accordingly. In this notebook, we limit the machine number as 1 (i.e., the local machine) just for demonstration."
   ],
   "metadata": {}
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "source": [
    "import vineyard\n",
    "import subprocess as sp\n",
    "\n",
    "# launch local vineyardd\n",
    "client = vineyard.connect()\n",
    "\n",
    "# launch dask scheduler and worker\n",
    "dask_scheduler = sp.Popen(['dask-scheduler', '--host', 'localhost'])\n",
    "dask_worker = sp.Popen(['dask-worker', 'tcp://localhost:8786'])"
   ],
   "outputs": [],
   "metadata": {}
  },
  {
   "cell_type": "markdown",
   "source": [
    "Preprocessing the data\n",
    "----------------------\n",
    "\n",
    "To read the data, we replace\n",
    "**pd.read_csv** by **dd.read_csv**, which will automatically\n",
    "read the data in parallel."
   ],
   "metadata": {}
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "source": [
    "import dask.dataframe as dd\n",
    "raw_data = dd.read_csv('covtype.data', header=None)"
   ],
   "outputs": [],
   "metadata": {}
  },
  {
   "cell_type": "markdown",
   "source": [
    "Then we preprocess the data using the same code from the example,\n",
    "except the replacement of **pd.concat** to **dd.concat** only."
   ],
   "metadata": {}
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "source": [
    "\"\"\"\n",
    "The two categorical features in the dataset are binary-encoded.\n",
    "We will convert this dataset representation to the typical representation, where each\n",
    "categorical feature is represented as a single integer value.\n",
    "\"\"\"\n",
    "import warnings\n",
    "warnings.filterwarnings('ignore')\n",
    "\n",
    "soil_type_values = [f\"soil_type_{idx+1}\" for idx in range(40)]\n",
    "wilderness_area_values = [f\"area_type_{idx+1}\" for idx in range(4)]\n",
    "\n",
    "soil_type = raw_data.loc[:, 14:53].apply(\n",
    "    lambda x: soil_type_values[0::1][x.to_numpy().nonzero()[0][0]], axis=1\n",
    ")\n",
    "wilderness_area = raw_data.loc[:, 10:13].apply(\n",
    "    lambda x: wilderness_area_values[0::1][x.to_numpy().nonzero()[0][0]], axis=1\n",
    ")\n",
    "\n",
    "CSV_HEADER = [\n",
    "    \"Elevation\",\n",
    "    \"Aspect\",\n",
    "    \"Slope\",\n",
    "    \"Horizontal_Distance_To_Hydrology\",\n",
    "    \"Vertical_Distance_To_Hydrology\",\n",
    "    \"Horizontal_Distance_To_Roadways\",\n",
    "    \"Hillshade_9am\",\n",
    "    \"Hillshade_Noon\",\n",
    "    \"Hillshade_3pm\",\n",
    "    \"Horizontal_Distance_To_Fire_Points\",\n",
    "    \"Wilderness_Area\",\n",
    "    \"Soil_Type\",\n",
    "    \"Cover_Type\",\n",
    "]\n",
    "\n",
    "data = dd.concat(\n",
    "    [raw_data.loc[:, 0:9], wilderness_area, soil_type, raw_data.loc[:, 54]],\n",
    "    axis=1,\n",
    "    ignore_index=True,\n",
    ")\n",
    "data.columns = CSV_HEADER\n",
    "\n",
    "# Convert the target label indices into a range from 0 to 6 (there are 7 labels in total).\n",
    "data[\"Cover_Type\"] = data[\"Cover_Type\"] - 1"
   ],
   "outputs": [],
   "metadata": {}
  },
  {
   "cell_type": "markdown",
   "source": [
    "Finally, instead of saving the preprocessed data into files, we store them in Vineyard.\n"
   ],
   "metadata": {}
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "source": [
    "import vineyard\n",
    "from vineyard.core.builder import builder_context\n",
    "from vineyard.contrib.dask.dask import register_dask_types\n",
    "\n",
    "with builder_context() as builder:\n",
    "    register_dask_types(builder, None) # register dask builders\n",
    "    gdf_id = client.put(data, dask_scheduler='tcp://localhost:8786')\n",
    "    print(gdf_id)"
   ],
   "outputs": [],
   "metadata": {}
  },
  {
   "cell_type": "markdown",
   "source": [
    "We saved the preprocessed data as a global dataframe\n",
    "in Vineyard with the ObjectID above."
   ],
   "metadata": {}
  },
  {
   "cell_type": "markdown",
   "source": [
    "Training the model\n",
    "------------------\n",
    "\n",
    "In the single machine solution from the example. A **get_dataset_from_csv** function \n",
    "is defined to load the dataset from the files of the preprocessed data as follows:\n",
    "```python\n",
    "def get_dataset_from_csv(csv_file_path, batch_size, shuffle=False):\n",
    "\n",
    "    dataset = tf.data.experimental.make_csv_dataset(\n",
    "        csv_file_path,\n",
    "        batch_size=batch_size,\n",
    "        column_names=CSV_HEADER,\n",
    "        column_defaults=COLUMN_DEFAULTS,\n",
    "        label_name=TARGET_FEATURE_NAME,\n",
    "        num_epochs=1,\n",
    "        header=True,\n",
    "        shuffle=shuffle,\n",
    "    )\n",
    "    return dataset.cache()\n",
    "```\n",
    "while in the training procedure, it loads the train_dataset and test_dataset\n",
    "separately from two files as:\n",
    "```python\n",
    "def run_experiment(model):\n",
    "\n",
    "    model.compile(\n",
    "        optimizer=keras.optimizers.Adam(learning_rate=learning_rate),\n",
    "        loss=keras.losses.SparseCategoricalCrossentropy(),\n",
    "        metrics=[keras.metrics.SparseCategoricalAccuracy()],\n",
    "    )\n",
    "\n",
    "    train_dataset = get_dataset_from_csv(train_data_file, batch_size, shuffle=True)\n",
    "\n",
    "    test_dataset = get_dataset_from_csv(test_data_file, batch_size)\n",
    "\n",
    "    print(\"Start training the model...\")\n",
    "    history = model.fit(train_dataset, epochs=num_epochs)\n",
    "    print(\"Model training finished\")\n",
    "\n",
    "    _, accuracy = model.evaluate(test_dataset, verbose=0)\n",
    "\n",
    "    print(f\"Test accuracy: {round(accuracy * 100, 2)}%\")\n",
    "```\n",
    "In our solution, we provide a function to load dataset from the global dataframe\n",
    "generated in the last step."
   ],
   "metadata": {}
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "source": [
    "from vineyard.core.resolver import resolver_context\n",
    "from vineyard.contrib.ml.tensorflow import register_tf_types\n",
    "\n",
    "def get_dataset_from_vineyard(object_id, batch_size, shuffle=False):\n",
    "    with resolver_context() as resolver:\n",
    "        register_tf_types(None, resolver) # register tf resolvers\n",
    "        ds = vineyard.connect().get(object_id, label=TARGET_FEATURE_NAME) # specify the label column\n",
    "\n",
    "    if shuffle:\n",
    "        ds = ds.shuffle(len(ds))\n",
    "\n",
    "    len_test = int(len(ds) * 0.15)\n",
    "    test_dataset = ds.take(len_test).batch(batch_size)\n",
    "    train_dataset = ds.skip(len_test).batch(batch_size)\n",
    "\n",
    "    return train_dataset, test_dataset"
   ],
   "outputs": [],
   "metadata": {}
  },
  {
   "cell_type": "markdown",
   "source": [
    "And modify the training procedure with a few lines of horovod code."
   ],
   "metadata": {}
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "source": [
    "import horovod.keras as hvd\n",
    "\n",
    "def run_experiment(model):\n",
    "\n",
    "    hvd.init()\n",
    "\n",
    "    model.compile(\n",
    "        optimizer=hvd.DistributedOptimizer(keras.optimizers.Adam(learning_rate=learning_rate)),\n",
    "        loss=keras.losses.SparseCategoricalCrossentropy(),\n",
    "        metrics=[keras.metrics.SparseCategoricalAccuracy()],\n",
    "    )\n",
    "\n",
    "    callbacks = [\n",
    "        # Horovod: broadcast initial variable states from rank 0 to all other processes.\n",
    "        # This is necessary to ensure consistent initialization of all workers when\n",
    "        # training is started with random weights or restored from a checkpoint.\n",
    "        hvd.callbacks.BroadcastGlobalVariablesCallback(0),\n",
    "    ]\n",
    "\n",
    "    train_dataset, test_dataset = get_dataset_from_vineyard(gdf_id, batch_size, shuffle=True)\n",
    "\n",
    "    print(\"Start training the model...\")\n",
    "    history = model.fit(train_dataset, epochs=num_epochs, callbacks=callbacks)\n",
    "    print(\"Model training finished\")\n",
    "\n",
    "    _, accuracy = model.evaluate(test_dataset, verbose=0)\n",
    "\n",
    "    print(f\"Test accuracy: {round(accuracy * 100, 2)}%\")"
   ],
   "outputs": [],
   "metadata": {}
  },
  {
   "cell_type": "markdown",
   "source": [
    "All the other parts of training procedure are the same as the single machine solution."
   ],
   "metadata": {}
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "source": [
    "TARGET_FEATURE_NAME = \"Cover_Type\"\n",
    "\n",
    "TARGET_FEATURE_LABELS = [\"0\", \"1\", \"2\", \"3\", \"4\", \"5\", \"6\"]\n",
    "\n",
    "NUMERIC_FEATURE_NAMES = [\n",
    "    \"Aspect\",\n",
    "    \"Elevation\",\n",
    "    \"Hillshade_3pm\",\n",
    "    \"Hillshade_9am\",\n",
    "    \"Hillshade_Noon\",\n",
    "    \"Horizontal_Distance_To_Fire_Points\",\n",
    "    \"Horizontal_Distance_To_Hydrology\",\n",
    "    \"Horizontal_Distance_To_Roadways\",\n",
    "    \"Slope\",\n",
    "    \"Vertical_Distance_To_Hydrology\",\n",
    "]\n",
    "\n",
    "CATEGORICAL_FEATURES_WITH_VOCABULARY = {\n",
    "    \"Soil_Type\": soil_type_values,\n",
    "    \"Wilderness_Area\": wilderness_area_values,\n",
    "}\n",
    "\n",
    "CATEGORICAL_FEATURE_NAMES = list(CATEGORICAL_FEATURES_WITH_VOCABULARY.keys())\n",
    "\n",
    "FEATURE_NAMES = NUMERIC_FEATURE_NAMES + CATEGORICAL_FEATURE_NAMES\n",
    "\n",
    "NUM_CLASSES = len(TARGET_FEATURE_LABELS)\n",
    "\n",
    "learning_rate = 0.001\n",
    "dropout_rate = 0.1\n",
    "batch_size = 265\n",
    "num_epochs = 5\n",
    "\n",
    "hidden_units = [32, 32]\n",
    "\n",
    "\"\"\"\n",
    "## Create model inputs\n",
    "Now, define the inputs for the models as a dictionary, where the key is the feature name,\n",
    "and the value is a `keras.layers.Input` tensor with the corresponding feature shape\n",
    "and data type.\n",
    "\"\"\"\n",
    "import tensorflow as tf\n",
    "\n",
    "def create_model_inputs():\n",
    "    inputs = {}\n",
    "    for feature_name in FEATURE_NAMES:\n",
    "        if feature_name in NUMERIC_FEATURE_NAMES:\n",
    "            inputs[feature_name] = layers.Input(\n",
    "                name=feature_name, shape=(), dtype=tf.float32\n",
    "            )\n",
    "        else:\n",
    "            inputs[feature_name] = layers.Input(\n",
    "                name=feature_name, shape=(), dtype=tf.string\n",
    "            )\n",
    "    return inputs\n",
    "\n",
    "\n",
    "\"\"\"\n",
    "## Encode features\n",
    "We create two representations of our input features: sparse and dense:\n",
    "1. In the **sparse** representation, the categorical features are encoded with one-hot\n",
    "encoding using the `CategoryEncoding` layer. This representation can be useful for the\n",
    "model to *memorize* particular feature values to make certain predictions.\n",
    "2. In the **dense** representation, the categorical features are encoded with\n",
    "low-dimensional embeddings using the `Embedding` layer. This representation helps\n",
    "the model to *generalize* well to unseen feature combinations.\n",
    "\"\"\"\n",
    "\n",
    "\n",
    "from tensorflow.keras.layers import StringLookup\n",
    "\n",
    "\n",
    "def encode_inputs(inputs, use_embedding=False):\n",
    "    encoded_features = []\n",
    "    for feature_name in inputs:\n",
    "        if feature_name in CATEGORICAL_FEATURE_NAMES:\n",
    "            vocabulary = CATEGORICAL_FEATURES_WITH_VOCABULARY[feature_name]\n",
    "            # Create a lookup to convert string values to an integer indices.\n",
    "            # Since we are not using a mask token nor expecting any out of vocabulary\n",
    "            # (oov) token, we set mask_token to None and  num_oov_indices to 0.\n",
    "            lookup = StringLookup(\n",
    "                vocabulary=vocabulary,\n",
    "                mask_token=None,\n",
    "                num_oov_indices=0,\n",
    "                output_mode=\"int\" if use_embedding else \"binary\",\n",
    "            )\n",
    "            if use_embedding:\n",
    "                # Convert the string input values into integer indices.\n",
    "                encoded_feature = lookup(inputs[feature_name])\n",
    "                embedding_dims = int(math.sqrt(len(vocabulary)))\n",
    "                # Create an embedding layer with the specified dimensions.\n",
    "                embedding = layers.Embedding(\n",
    "                    input_dim=len(vocabulary), output_dim=embedding_dims\n",
    "                )\n",
    "                # Convert the index values to embedding representations.\n",
    "                encoded_feature = embedding(encoded_feature)\n",
    "            else:\n",
    "                # Convert the string input values into a one hot encoding.\n",
    "                encoded_feature = lookup(tf.expand_dims(inputs[feature_name], -1))\n",
    "        else:\n",
    "            # Use the numerical features as-is.\n",
    "            encoded_feature = tf.expand_dims(inputs[feature_name], -1)\n",
    "\n",
    "        encoded_features.append(encoded_feature)\n",
    "\n",
    "    all_features = layers.concatenate(encoded_features)\n",
    "    return all_features\n",
    "\n",
    "\n",
    "\"\"\"\n",
    "## Experiment 1: a baseline model\n",
    "In the first experiment, let's create a multi-layer feed-forward network,\n",
    "where the categorical features are one-hot encoded.\n",
    "\"\"\"\n",
    "from tensorflow import keras\n",
    "from tensorflow.keras import layers\n",
    "\n",
    "def create_baseline_model():\n",
    "    inputs = create_model_inputs()\n",
    "    features = encode_inputs(inputs)\n",
    "\n",
    "    for units in hidden_units:\n",
    "        features = layers.Dense(units)(features)\n",
    "        features = layers.BatchNormalization()(features)\n",
    "        features = layers.ReLU()(features)\n",
    "        features = layers.Dropout(dropout_rate)(features)\n",
    "\n",
    "    outputs = layers.Dense(units=NUM_CLASSES, activation=\"softmax\")(features)\n",
    "    model = keras.Model(inputs=inputs, outputs=outputs)\n",
    "    return model\n",
    "\n",
    "\n",
    "baseline_model = create_baseline_model()"
   ],
   "outputs": [],
   "metadata": {}
  },
  {
   "cell_type": "markdown",
   "source": [
    "Let's run it:"
   ],
   "metadata": {}
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "source": [
    "run_experiment(baseline_model)"
   ],
   "outputs": [],
   "metadata": {}
  },
  {
   "cell_type": "markdown",
   "source": [
    "We clear the environments in the end."
   ],
   "metadata": {}
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "source": [
    "dask_worker.terminate()\n",
    "dask_scheduler.terminate()\n",
    "\n",
    "vineyard.shutdown()"
   ],
   "outputs": [],
   "metadata": {}
  },
  {
   "cell_type": "markdown",
   "source": [
    "Finally, we can use **horovodrun** to run the above code distributedly in a cluster for distributed learning on big datasets."
   ],
   "metadata": {}
  },
  {
   "cell_type": "markdown",
   "source": [
    "Conclusion\n",
    "----------\n",
    "\n",
    "From this example, we can see that with the help of Vineyard, users can easily extend\n",
    "their single machine solutions to distributed learning using dedicated systems without\n",
    "worrying about the cross-system data sharing issues."
   ],
   "metadata": {}
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3 (ipykernel)",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.8.2"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
